#include "config.h"

#include <dirent.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <libaio.h>
#include <ctype.h>
#include <sys/mman.h>
#include <errno.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "sysy_lib.h"
#include "buffer.h"
#include "schedule.h"
#include "lich_aio.h"
#include "net_global.h"
#include "adt.h"
#include "job_dock.h"
#include "atomic.h"
#include "volume_proto.h"
#include "lichstor.h"
#include "core.h"

#define ECLOG_FLUSH_TMO 1

typedef struct {
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;

        ec_t ec;
        const clockstat_t *clocks;
        unsigned char *src_in_err;
} args_t;

STATIC int __eclog_md_load(eclog_t *eclog, eclog_seg_t *seg);

int eclog_chunk_islog(const chkid_t *chkid, const ec_t *ec)
{
#if ECLOG_ENABLE
        return (chkid->idx % ECLOG_SECTION_CHUNK_COUNT(ec) < ECLOG_LOG_CHUNK_COUNT(ec));
#else
        (void) chkid;
        (void) ec;
        return 0;
#endif
}

int eclog_chunk_skiplog(const chkid_t *chkid, const ec_t *ec)
{
#if ECLOG_ENABLE
        return (chkid->idx - _ceil(chkid->idx, ECLOG_SECTION_CHUNK_COUNT(ec)) * ECLOG_LOG_CHUNK_COUNT(ec));
#else
        (void) chkid;
        (void) ec;
        return 0;
#endif
}

int __eclog_log_getidx(const chkid_t *chkid, const ec_t *ec)
{
        DINFO("eclog. get logidx. %llu\n",
              chkid->idx / ECLOG_SECTION_CHUNK_COUNT(ec));
        return chkid->idx / ECLOG_SECTION_CHUNK_COUNT(ec);
}

static eclog_t *__eclog_get(volume_proto_t *volume_proto, const chkid_t *chkid, const ec_t *ec)
{
        uint32_t idx;

        idx = __eclog_log_getidx(chkid, ec);
        DINFO("eclog. get log. idx(%u), logmaxec(%llu)\n",
              idx, ECLOG_LOG_MAX(ec));
        YASSERT(idx < ECLOG_LOG_MAX(ec));
        return volume_proto->eclog[idx];
}

STATIC int __eclog_read(eclog_t *eclog, buffer_t *_buf, size_t _size, off_t offset)
{
        int ret;
        io_t io;

        DINFO("eclog. read. chunkid(%llu, %u, %u), offset(%llu), len(%llu)\n",
              (LLU)eclog->volume_proto->chkid.id, 
              (unsigned int)eclog->volume_proto->chkid.type,
              (unsigned int)eclog->volume_proto->chkid.idx, 
              (LLU)eclog->ecloginfo.offset + offset,
              (LLU)_size);
        io_init(&io, &eclog->volume_proto->chkid, 0,
                        eclog->ecloginfo.offset + offset, _size, __FILE_ATTR_ECLOG__);
        ret = volume_proto_rep_read(eclog->volume_proto, &io, _buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_write(eclog_t *eclog, const buffer_t *_buf, size_t _size, off_t offset)
{
        int ret;
        io_t io;
        io_opt_t io_opt;

        DINFO("eclog. write. chunkid(%llu, %u, %u), offset(%llu), len(%llu)\n",
              (LLU)eclog->volume_proto->chkid.id, 
              (unsigned int)eclog->volume_proto->chkid.type,
              (unsigned int)eclog->volume_proto->chkid.idx, 
              (LLU)eclog->ecloginfo.offset + offset,
              (LLU)_size);
        io_init(&io, &eclog->volume_proto->chkid, 0,
                        eclog->ecloginfo.offset + offset, _size, __FILE_ATTR_ECLOG__);
        io_opt_init(&io_opt, 0, 0, 0, 0);
        ret = volume_proto_rep_write(eclog->volume_proto, &io_opt, &io, _buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_init_info(eclog_t *eclog, const ec_t *ec, int idx)
{
        ecloginfo_t *ecloginfo = &eclog->ecloginfo;

        ecloginfo->offset = idx * ECLOG_SECTION_SIZE_ALIGN(ec);
        ecloginfo->size = ECLOG_LOG_SIZE_ALIGN(ec);

        ecloginfo->wb_size = __GET_ECLOG_SIZE__(ecloginfo->size);
        ecloginfo->md_head = __GET_ECLOG_MD_HEAD__(ecloginfo->size);
        ecloginfo->md_total = __GET_ECLOG_MD_TOTAL__(ecloginfo->size);

        ecloginfo->seg_count = __GET_ECLOG_SEG_COUNT__(ecloginfo->size);
        ecloginfo->seg_len = __GET_ECLOG_SEG_LEN__(ecloginfo->size);

        ecloginfo->seg_md_count = __GET_ECLOG_SEG_MD_COUNT__(ecloginfo->size);
        ecloginfo->seg_md_size = __GET_ECLOG_SEG_MD_SIZE__(ecloginfo->size);

        return 0;
}

STATIC int __eclog_updatemd(eclog_t *eclog)
{
        int ret;
        eclogmd_t *eclogmd;
        buffer_t buf;

        DINFO("update eclog[%u] offset:%ld\n",
                        eclog->eclogid, eclog->ecloginfo.offset);

        eclogmd = eclog->eclogmd;
        mbuffer_init(&buf, 0);
        mbuffer_copy(&buf, (char *)eclogmd, sizeof(*eclogmd));

        ret = __eclog_write(eclog, &buf, sizeof(*eclogmd), 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&buf);

        DINFO("update eclog[%u] offset:%ld metadata success \n",
                                eclog->eclogid, eclog->ecloginfo.offset);

        return 0;
err_ret:
        mbuffer_free(&buf);
        return ret;
}

STATIC int __eclog_use_seg(eclog_t *eclog, int idx)
{
        int ret, last_seg;
        eclogmd_t *eclogmd;

        eclogmd = eclog->eclogmd;
        DINFO("eclog[%u] use seg[%u]\n", eclog->eclogid, idx);

        last_seg = eclogmd->seg_idx;
        eclogmd->seg_info[idx].seg_used = eclogmd->cursor;
        eclogmd->cursor++;
        eclogmd->seg_idx = idx;

        DINFO("eclog. logmdinfo: magic(%u), segidx(%ld), cursor(%llu)\n",
              (unsigned int)eclogmd->magic, eclogmd->seg_idx, (LLU)eclogmd->cursor);
        ret = __eclog_updatemd(eclog);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
        eclogmd->seg_info[idx].seg_used = 0;
        eclogmd->cursor--;
        eclogmd->seg_idx = last_seg;
        return ret;
}

STATIC int __eclog_add_seg(eclog_t *eclog)
{
        int ret, i;
        eclogmd_t *eclogmd;
        eclog_seg_t *seg;

        eclogmd = eclog->eclogmd;
        for (i = 0; i < eclog->ecloginfo.seg_count; i++) {
                seg = &eclog->seg[i];
                memset(seg, 0x0, sizeof(*seg));

                atomic_set(&seg->dirty, 0);

                seg->segid = i;
                seg->md_offset = eclog->ecloginfo.md_head
                        + seg->segid * eclog->ecloginfo.seg_md_size;
                seg->data_offset = eclog->ecloginfo.md_total
                        + seg->segid * eclog->ecloginfo.seg_len;

                if (eclogmd->seg_info[i].seg_used) {
                        ret = __eclog_md_load(eclog, seg);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else {
                        seg->md = NULL;
                }
        }

        return 0;
err_ret:
        return ret;
}

int __eclog_reset(eclog_t *eclog)
{
        int ret;
        uint32_t i;
        off_t off;
        size_t total;
        buffer_t mbuf;
        ec_t *ec;

        ec = &eclog->volume_proto->table1.fileinfo.ec;

        mbuffer_init(&mbuf, 0);
        mbuffer_appendzero(&mbuf, sizeof(eclogmd_t));
        ret = __eclog_write(eclog, &mbuf, sizeof(eclogmd_t), 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mbuffer_free(&mbuf);
        mbuffer_appendzero(&mbuf, LICH_CHUNK_SPLIT);

        off = __GET_ECLOG_MD_OFFSET__(ECLOG_LOG_SIZE_ALIGN(ec));
        total = __GET_ECLOG_MD_TOTAL__(ECLOG_LOG_SIZE_ALIGN(ec)) - __GET_ECLOG_MD_HEAD__(ECLOG_LOG_SIZE_ALIGN(ec));

        for (i = off/LICH_CHUNK_SPLIT; i < _ceil(total - off, LICH_CHUNK_SPLIT); i++) {
                ret = __eclog_write(eclog, &mbuf, LICH_CHUNK_SPLIT, i * LICH_CHUNK_SPLIT);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        mbuffer_free(&mbuf);
        return 0;
err_ret:
        mbuffer_free(&mbuf);
        return ret;
}

STATIC int __eclog_md_load(eclog_t *eclog, eclog_seg_t *seg)
{
        int ret;
        void *addr;
        buffer_t buf;

        ret = ymalloc((void **)&addr, eclog->ecloginfo.seg_md_size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("load eclog[%u] seg[%ld] offset:%ld, size:%ld, md:%p\n",
                                eclog->eclogid, seg->segid,
                                seg->md_offset, eclog->ecloginfo.seg_md_size, addr);

        mbuffer_init(&buf, 0);
        ret = __eclog_read(eclog, &buf, eclog->ecloginfo.seg_md_size, seg->md_offset);
        if (unlikely(ret))
                GOTO(err_free, ret);

        YASSERT(buf.len == (uint32_t)eclog->ecloginfo.seg_md_size);
        mbuffer_get(&buf, addr, buf.len);
        mbuffer_free(&buf);

        YASSERT(seg->md == NULL);
        seg->md = addr;

        return 0;
err_free:
        yfree((void **)&addr);
        mbuffer_free(&buf);
err_ret:
        return ret;
}

static int __eclog_allocate(eclog_t *eclog)
{
        int ret;

        ret = ymalloc((void **)&eclog->seg, eclog->ecloginfo.seg_count * sizeof(eclog_seg_t));
        if (ret)
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&eclog->eclogmd, ECLOG_MD_SIZE(eclog->ecloginfo.seg_count));
        if (ret)
                GOTO(err_free, ret);

        return 0;
err_free:
        yfree((void **)&eclog->seg);
err_ret:
        return ret;
}

int __eclog_log_load(volume_proto_t *volume_proto, eclog_t **_eclog,
                const ec_t *ec, int idx, int redo)
{
        int ret;
        eclog_t *eclog;
        eclogmd_t *eclogmd;
        buffer_t buf;
        char uuid[UUID_LEN] = {};

        ret = ymalloc((void **)&eclog, sizeof(eclog_t));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        eclog->volume_proto = volume_proto;
        eclog->eclogid = idx;

        ret = __eclog_init_info(eclog, ec, idx);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        ret = __eclog_allocate(eclog);
        if (ret)
                GOTO(err_ret, ret);

        YASSERT(sizeof(*eclogmd) <= eclog->ecloginfo.md_head);
        eclogmd = eclog->eclogmd;

        mbuffer_init(&buf, 0);
        ret = __eclog_read(eclog, &buf, sizeof(*eclogmd), 0);
        if (unlikely(ret))
                GOTO(err_buf, ret);

        //YASSERT(buf.len == sizeof(*eclogmd));
        mbuffer_get(&buf, eclogmd, buf.len);
        mbuffer_free(&buf);

        uuid_unparse(eclogmd->uuid, uuid);

        if (eclogmd->magic != __ECLOG_MAGIC__) {
                DINFO("init reset ec log\n");
                ret = __eclog_reset(eclog);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                memset(eclogmd, 0x0, sizeof(*eclogmd));
                eclogmd->magic = __ECLOG_MAGIC__;
                ret = __eclog_updatemd(eclog);
                if (unlikely(ret))
                        GOTO(err_free, ret);
                DINFO("init reset ec log successful.\n");
        } else if (!redo) {
                YASSERT(0);
        }

        ret = plock_init(&eclog->rwlock, "eclog");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("eclog. add seg. \n");
        ret = __eclog_add_seg(eclog);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("eclog. add seg successful.\n");

        *_eclog = eclog;

        return 0;
err_buf:
        mbuffer_free(&buf);
err_free:
        yfree((void **)&eclog);
err_ret:
        return ret;
}

STATIC int __eclog_load_seg_read(eclog_t *eclog, eclog_seg_t *seg,
                const ec_md_t *md, buffer_t *buf)
{
        int ret;
        ret = __eclog_read(eclog, buf, md->size, md->seg_offset + seg->data_offset);
        if (unlikely(ret < 0)) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_redo(eclog_t *eclog, const chkid_t *chkid, uint32_t off, uint32_t count, buffer_t *buf)
{
        int ret, i;
        chkinfo_t *chkinfo;
        chkstat_t *chkstat;
        char _chkstat[CHKSTAT_MAX], _chkinfo[CHKINFO_MAX];
        io_t io;
        io_opt_t io_opt;
        ec_t *ec;
        buffer_t strips[EC_MMAX];
        chunk_io_t chunk_io;

        chkinfo = (void *)_chkinfo;
        chkstat = (void *)_chkstat;
        ec = &eclog->volume_proto->table1.fileinfo.ec;

        io_opt_init(&io_opt, 0, 0, 0, 0);
        chunk_io.io.id = *chkid;
        chunk_io.chkinfo = chkinfo;
        chunk_io.chkstat = chkstat;
        ret =  volume_proto_chunk_pre_write(eclog->volume_proto, &io_opt, &chunk_io);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        io = chunk_io.io;
        io.offset = off * STRIP_BLOCK;
        io.size = count * ec->k;
        YASSERT(buf->len == STRIP_BLOCK * count * ec->m);
        for (i = 0; i < ec->m; i++) {
                mbuffer_init(&strips[i], 0);
                mbuffer_pop(buf, &strips[i], STRIP_BLOCK * count);
        }

        ret = chunk_proto_ec_write_commit(chkinfo, chkstat, &io, ec, off, count, strips);
        if (unlikely(ret)) {
                GOTO(err_free, ret);
        }

        for (i = 0; i < ec->m; i++) {
                mbuffer_free(&strips[i]);
        }
        return 0;
err_free:
        for (i = 0; i < ec->m; i++) {
                mbuffer_free(&strips[i]);
        }
err_ret:
        return ret;
}

STATIC int __eclog_load_seg(eclog_t *eclog, eclog_seg_t *seg,
                const chkid_t *chkid, args_t *args)
{
        int ret;
        uint32_t i, crc;
        size_t len;
        ec_md_t *md;
        buffer_t buf;

        for (i = 0; i < eclog->ecloginfo.seg_md_count; i++) {
                md = &seg->md[i];

                if (md->chkid.id == 0) {
                        break;
                }

                DINFO("eclog. load seg. seginfo: chkid(%llu, %u, %u), size(%ld), "
                      "offset(%ld), datacrc(%u), metacrc(%u), segoffset(%ld)\n",
                      (LLU)md->chkid.id,(unsigned int)md->chkid.type,
                      (unsigned int)md->chkid.idx, md->size, md->offset,
                      (unsigned int)md->data_crc, (unsigned int)md->meta_crc,
                      md->seg_offset);
                mbuffer_init(&buf, 0);

                ret = __eclog_load_seg_read(eclog, seg, md, &buf);
                if (ret)
                        GOTO(err_ret, ret);

                crc = mbuffer_crc(&buf, 0, buf.len);
                if (crc != md->data_crc) {
                        if (!chkid) {
                                DERROR("chunk "CHKID_FORMAT" eclog[%d] seg[%ld] offset %ld size %ld @ %ld(%llu), crc %x %x\n",
                                                CHKID_ARG(&md->chkid), eclog->eclogid, seg->segid, md->offset,
                                                md->size, md->seg_offset, (LLU)md->seg_offset + seg->data_offset,
                                                crc, md->data_crc);
                        }
                        break;
                }

                len = sizeof(md->chkid) +
                        sizeof(md->size) +
                        sizeof(md->offset) +
                        sizeof(md->data_crc) +
                        sizeof(md->seg_offset);
                crc = crc32_sum(md, len);
                if (crc != md->meta_crc) {
                        if (!chkid) {
                                DERROR("chunk "CHKID_FORMAT" eclog[%d] seg[%ld] offset %ld size %ld @ %ld(%llu), crc %x %x\n",
                                                CHKID_ARG(&md->chkid), eclog->eclogid, seg->segid, md->offset,
                                                md->size, md->seg_offset, (LLU)md->seg_offset + seg->data_offset,
                                                crc, md->data_crc);
                        }
                        break;
                }

                YASSERT(chkid_isvalid(&md->chkid));

                if (!chkid) {
                        DINFO("chunk redo "CHKID_FORMAT" offset %ld size %ld\n",
                                        CHKID_ARG(&md->chkid), md->offset, md->size);

                        ret = __eclog_redo(eclog, &md->chkid, md->offset, md->size, &buf);
                        if (ret)
                                GOTO(err_ret, ret);
                } else if (!chkid_cmp(&md->chkid, chkid)) {
                        DINFO("chunk redo "CHKID_FORMAT" offset %ld size %ld\n",
                                        CHKID_ARG(&md->chkid), md->offset, md->size);

                        ret = chunk_proto_ec_redo(args->chkinfo, args->chkstat, args->clocks,
                                        &args->ec, md->offset, md->size, &buf, args->src_in_err);
                        if (ret)
                                GOTO(err_ret, ret);
                }

                mbuffer_free(&buf);
        }

        if (!chkid)
                memset(seg->md, 0x0, eclog->ecloginfo.seg_md_size);

        return 0;
err_ret:
        mbuffer_free(&buf);
        return ret;
}

STATIC int __eclog_log_redo(eclog_t *eclog, const chkid_t *chkid, args_t *args)
{
        int ret, i, begin = -1, idx;
        eclog_seg_t *seg;
        uint64_t seq = UINT64_MAX;
        eclogmd_t *eclogmd = eclog->eclogmd;

        DINFO("eclog. log redo. logid(%d)\n", eclog->eclogid);

        for (i = 0; i < eclog->ecloginfo.seg_count; i++) {
                seg = &eclog->seg[i];

                if (seg->md) {
                        if (seq > eclogmd->seg_info[i].seg_used) {
                                seq = eclogmd->seg_info[i].seg_used;
                                begin = i;
                        }
                }
        }

        if (begin == -1) {
                DINFO("eclog[%u] empty\n", eclog->eclogid);
                goto out;
        }

        for (i = 0; i < eclog->ecloginfo.seg_count; i++) {
                idx = (i + begin) % eclog->ecloginfo.seg_count;

                if (eclogmd->seg_info[idx].seg_used == 0)
                        break;

                seg = &eclog->seg[idx];
                if (seg->md == NULL) {
                        ret = __eclog_md_load(eclog, seg);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                ret = __eclog_load_seg(eclog, seg, chkid, args);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        DINFO("eclog. log redo fin. logid(%d)\n", eclog->eclogid);

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_next(eclog_t *eclog, int idx)
{
        int ret;
        eclog_seg_t *seg;

        seg = &eclog->seg[idx];

        if (atomic_read(&seg->dirty)) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        ret = __eclog_use_seg(eclog, idx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("use eclog[%u], idx [%d]\n", eclog->eclogid, idx);

        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_getlocation__(eclog_t *eclog, int size, ecloc_t *loc)
{
        int ret, next, align;
        eclog_seg_t *seg;
        eclogmd_t *eclogmd;

        ret = plock_wrlock(&eclog->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        eclogmd = eclog->eclogmd;
        seg = &eclog->seg[eclogmd->seg_idx];

        align = ECLOG_ALIGN_SIZE - size % ECLOG_ALIGN_SIZE;

        if ((uint64_t)(seg->cur + size + align) > eclog->ecloginfo.seg_len) {
                next = (seg->segid + 1) % eclog->ecloginfo.seg_count;
                ret = __eclog_next(eclog, next);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                seg = &eclog->seg[next];
        }

        plock_unlock(&eclog->rwlock);
        YASSERT(atomic_read(&seg->dirty) >= 0);

        //uint64_t *offset, int *idx
        loc->offset = seg->cur + seg->data_offset;
        loc->segid = seg->segid;
        seg->cur += (size + align);
        atomic_inc(&seg->dirty);

        return 0;
err_lock:
        plock_unlock(&eclog->rwlock);
err_ret:
        return ret;
}

STATIC int __eclog_getlocation(eclog_t *eclog, int len, ecloc_t *loc)
{
        int ret, wait = 0;

        ANALYSIS_BEGIN(0);

        loc->eclogid = eclog->eclogid;
        ret = __eclog_getlocation__(eclog, len, loc);
        if (unlikely(ret)) {
                if (ret == ENOSPC) {
                        UNIMPLEMENTED(__DUMP__);
                        wait = 1;
                } else
                        GOTO(err_ret, ret);
        }

        if (wait) {
                UNIMPLEMENTED(__DUMP__);
                DWARN("eclog wait\n");
                ret = schedule_yield("eclog_wait", NULL, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
                DWARN("eclog wait resume\n");
        }

        ANALYSIS_QUEUE(0, 1000, "eclog_getlocation");

        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_cancel(eclog_t *eclog, const ecloc_t *loc)
{
        eclog_seg_t *seg;

        seg = &eclog->seg[loc->segid];
        YASSERT(atomic_read(&seg->dirty) >= 0);
        atomic_dec(&seg->dirty);
        DBUG("eclog[%u] seg[%u] offset %ld dirty %u, cur %ld\n", eclog->eclogid,
              loc->segid, loc->offset, atomic_read(&seg->dirty), seg->cur);

        return 0;
}

STATIC int __eclog_data(eclog_t *eclog, const buffer_t *buf, ecloc_t *loc)
{
        int ret;

        YASSERT(schedule_running());

        ret = __eclog_getlocation(eclog, buf->len, loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __eclog_write(eclog, buf, buf->len, loc->offset);
        if (unlikely(ret))
                GOTO(err_cancel, ret);

        return 0;
err_cancel:
        __eclog_cancel(eclog, loc);
err_ret:
        return ret;
}

STATIC int __eclog_md(eclog_t *eclog, const ecloc_t *loc, const chkid_t *chkid,
                          int offset, int size, uint32_t crc)
{
        int ret;
        size_t len;
        eclog_seg_t *seg;
        ec_md_t *md;
        buffer_t buf;

        mbuffer_init(&buf, 0);

        ret = plock_wrlock(&eclog->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        seg = &eclog->seg[loc->segid];
        if (seg->md == NULL) {
                ret = __eclog_md_load(eclog, seg);
                if (unlikely(ret))
                        GOTO(err_lock, ret);
        }

        plock_unlock(&eclog->rwlock);

        md = &seg->md[seg->md_count];
        md->chkid = *chkid;
        md->size = size;
        md->offset = offset;
        md->data_crc = crc;
        md->seg_offset = loc->offset - seg->data_offset;
        YASSERT(loc->offset - seg->data_offset < UINT32_MAX);
        len = sizeof(md->chkid) +
                sizeof(md->size) +
                sizeof(md->offset) +
                sizeof(md->data_crc) +
                sizeof(md->seg_offset);
        md->meta_crc = crc32_sum(&md, len);

        DBUG("update eclog[%u] seg[%ld], md[%ld]\n",
                        eclog->eclogid, seg->segid, seg->md_count);

        seg->md_count++;

        
        mbuffer_copy(&buf, (char *)md, sizeof(*md));

        ret = __eclog_write(eclog, &buf, sizeof(*md), seg->md_offset + seg->md_count * sizeof(*md));
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_free(&buf);

        return 0;
err_lock:
        plock_unlock(&eclog->rwlock);
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

int chunk_proto_eclog_write(volume_proto_t *volume_proto, const io_t *io, const ec_t *ec,
                uint32_t off, uint32_t count, buffer_t *strips, eclog_t **_eclog, ecloc_t *_loc)
{
        int ret, idx;
        eclog_t *eclog;
        uint32_t i;
        buffer_t buf;
        ecloc_t loc;
        uint32_t crc;

        if(!EC_ISLOG(ec)) {
                return 0;
        }

        eclog = __eclog_get(volume_proto, &io->id, ec);
        if (!eclog) {
                idx = __eclog_log_getidx(&io->id, ec);
                ret = __eclog_log_load(volume_proto, &eclog, ec, idx, FALSE);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }

                volume_proto->eclog[idx] = eclog;
        }

        mbuffer_init(&buf, 0);
        for (i = 0; i < ec->m; i++) {
                ret = mbuffer_append(&buf, &strips[i]);
                if (unlikely(ret)) {
                        GOTO(err_free, ret);
                }
        }

        YASSERT(buf.len == STRIP_BLOCK * count * ec->m);

        crc = mbuffer_crc(&buf, 0, buf.len);

        ret = __eclog_data(eclog, &buf, &loc);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = __eclog_md(eclog, &loc, &io->id, off, count, crc);
        if (unlikely(ret))
                GOTO(err_canel, ret);

        mbuffer_free(&buf);

        *_loc = loc;
        *_eclog = eclog;

        return 0;
err_canel:
        __eclog_cancel(eclog, &loc);
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

int __eclog_log_cleanup(eclog_t *eclog, ecloc_t *loc)
{
        eclog_seg_t *seg;

        seg = &eclog->seg[loc->segid];
        DBUG("eclog. dirtyinfo. dec seg dirty. segid(%d), dirty(%u)\n",
                     loc->segid, atomic_read(&seg->dirty));
        atomic_dec(&seg->dirty);
        DBUG("eclog[%u] seg[%d] offset %ld dirty %u, cur %ld\n", eclog->eclogid,
              loc->segid, loc->offset, atomic_read(&seg->dirty), seg->cur);

        return 0;
}

STATIC int __eclog_md_getmin(eclog_t *eclog, int *_min)
{
        int i, idx = -1;
        eclog_seg_t *seg;
        uint64_t seq = UINT64_MAX;
        eclogmd_t *eclogmd = eclog->eclogmd;

        for (i = 0; i < eclog->ecloginfo.seg_count; i++) {
                seg = &eclog->seg[i];

                if (eclog->volume_proto->destroy)
                        break;

                if (seg->md) {
                        if (seq > eclogmd->seg_info[i].seg_used) {
                                seq = eclogmd->seg_info[i].seg_used;
                                idx = i;
                        }

                        DBUG("flush eclog[%u] seg[%ld], seq %llu\n",
                                        eclog->eclogid, seg->segid, (LLU)seq);
                }
        }

        *_min = idx;

        return 0;
}

STATIC int __eclog_md_unload__(eclog_t *eclog, eclog_seg_t *seg)
{
        int ret;
        eclogmd_t *eclogmd = eclog->eclogmd;
        buffer_t buf;

        DINFO("unload eclog[%u] seg[%ld] md:%p\n", eclog->eclogid, seg->segid, seg->md);

        memset(seg->md, 0x0, eclog->ecloginfo.seg_md_size);
        mbuffer_init(&buf, 0);
        mbuffer_copy(&buf, (char *)seg->md, eclog->ecloginfo.seg_md_size);

        ret = __eclog_write(eclog, &buf, eclog->ecloginfo.seg_md_size, seg->md_offset);
        if (unlikely(ret))
                GOTO(err_free, ret);

        mbuffer_free(&buf);
        yfree((void **)&seg->md);
        seg->md = NULL;
        seg->md_count = 0;
        seg->cur = 0;

        eclogmd->seg_info[seg->segid].seg_used = 0;
        ret = __eclog_updatemd(eclog);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_free:
        mbuffer_free(&buf);
err_ret:
        return ret;
}

STATIC int __eclog_md_unload(eclog_t *eclog, int min)
{
        int ret, idx = min;
        eclogmd_t *eclogmd = eclog->eclogmd;
        eclog_seg_t *seg;

        if (min == -1) {
                goto out;
        }

        while (idx != eclogmd->seg_idx) {
                if (eclog->volume_proto->destroy)
                        break;

                YASSERT(idx > -1 && idx < eclog->ecloginfo.seg_count);
                seg = &eclog->seg[idx];

                DINFO("eclog. segid(%ld), segidx(%ld), dirty(%d), seq(%d)\n",
                        seg->segid, eclogmd->seg_idx, atomic_read(&seg->dirty), min);
                if (seg->md != NULL && eclogmd->seg_idx != seg->segid && atomic_read(&seg->dirty) <= 0) {
                        atomic_set(&seg->dirty, 0);
                        ret = __eclog_md_unload__(eclog, seg);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else if (seg->md != NULL && eclogmd->seg_idx != seg->segid) {
                        DINFO("eclog[%u] seg[%ld] dirty %u, cur %ld\n", eclog->eclogid,
                                        seg->segid, atomic_read(&seg->dirty), seg->cur);
                }

                idx = (idx + 1) % eclog->ecloginfo.seg_count;
        }

out:
        return 0;
err_ret:
        return ret;
}

int chunk_proto_eclog_clean(eclog_t *eclog, ecloc_t *ecloc)
{
        int ret;

        if(!eclog) {
                return 0;
        }

        ret = __eclog_log_cleanup(eclog, ecloc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int eclog_chunk_redo(volume_proto_t *volume_proto, chkinfo_t *chkinfo, chkstat_t *chkstat,
                const ec_t *ec, clockstat_t *clocks, unsigned char *src_in_err)
{
        int ret;
        eclog_t *eclog;
        args_t args;

        if (!EC_ISEC(ec)) {
                goto out;
        }

        args.chkinfo = chkinfo;
        args.chkstat = chkstat;
        args.ec = *ec;
        args.clocks = clocks;
        args.src_in_err = src_in_err;

        eclog = __eclog_get(volume_proto, &chkinfo->id, ec);
        YASSERT(eclog);

        ret = __eclog_log_redo(eclog, &chkinfo->id, &args);
        if (ret) {
                GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

STATIC int __eclog_log_flush(va_list ap)
{
        int ret, min, retry = 0;
        eclog_t *eclog = va_arg(ap, eclog_t *);

        va_end(ap);

retry:
        ret = __eclog_md_getmin(eclog, &min);
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, gloconf.rpc_timeout * 3, (1000 * 1000));
        }

        ret = __eclog_md_unload(eclog, min);
        if (unlikely(ret)) {
                USLEEP_RETRY(err_ret, ret, retry, retry, gloconf.rpc_timeout * 3, (1000 * 1000));
        }

        return 0;
err_ret:
        DWARN("eclog flush fail:%d\n", ret);
        return ret;
}

static void *__eclog_flush_worker__(void *arg)
{
        int ret;
        uint64_t i, count;
        volume_proto_t *volume_proto = arg;
        eclog_t *eclog;
        ec_t *ec;

        DINFO("volume "CHKID_FORMAT" flush start\n", CHKID_ARG(&volume_proto->chkid));
        ec = &volume_proto->table1.fileinfo.ec;
        YASSERT(EC_ISEC(ec));

        while (1) {
                usleep(ECLOG_FLUSH_TMO / 2 * USEC_PER_SEC);

                count = ECLOG_LOG_COUNT(volume_proto->table1.fileinfo.size, ec);

                for (i = 0; i < count; i++) {
                        if (volume_proto->destroy) {
                                break;
                        }

                        eclog = volume_proto->eclog[i];
                        if (!eclog) {
                                continue;
                        }

                        ret = core_request(core_hash(&volume_proto->chkid), -1, "eclog_flush",
                                        __eclog_log_flush, eclog);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }

                if (volume_proto->destroy) {
                        volume_proto->destroy = 2;
                        DINFO("volume "CHKID_FORMAT" flush end\n", CHKID_ARG(&volume_proto->chkid));
                        break;
                }
        }

        return NULL;
err_ret:
        YASSERT(0);
        return NULL;
}

int __eclog_flush_worker(volume_proto_t *volume_proto)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, __eclog_flush_worker__, volume_proto);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int eclog_load(volume_proto_t *volume_proto)
{
        int ret, min;
        uint64_t i, count;
        ec_t *ec;
        eclog_t *eclog;

        ec = &volume_proto->table1.fileinfo.ec;
        if (!EC_ISEC(ec) || volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__) {
                goto out;
        }

        count = ECLOG_LOG_COUNT(volume_proto->table1.fileinfo.size, ec);
        for (i = 0; i < count; i++) {
                eclog = volume_proto->eclog[i];
                if (!eclog) {
                        ret = __eclog_log_load(volume_proto, &eclog, ec, i, TRUE);
                        if (unlikely(ret)) {
                                GOTO(err_ret, ret);
                        }

                        volume_proto->eclog[i] = eclog;
                }

                ret = __eclog_log_redo(eclog, NULL, NULL);
                if (ret) {
                        GOTO(err_ret, ret);
                }

                ret = __eclog_md_getmin(eclog, &min);
                if (ret)
                        GOTO(err_ret, ret);

                ret = __eclog_md_unload(eclog, min);
                if (ret)
                        GOTO(err_ret, ret);
        }

        ret = __eclog_flush_worker(volume_proto);
        if (ret)
                GOTO(err_ret, ret);

        DINFO("eclog. load eclog successful.\n");

out:
        return 0;
err_ret:
        return ret;
}
